Updating package managers#346
Conversation
| # def test_get_package_manager_azure_linux_4_and_rhel10_not_supported(self): | ||
| # """Test that Azure Linux 4 and RHEL 10 log unsupported message""" | ||
| # self.backup_platform_system = platform.system | ||
| # self.backup_linux_distribution = self.envlayer.platform.linux_distribution | ||
| # self.backup_distro_os_release_attr = distro.os_release_attr | ||
| # | ||
| # platform.system = self.mock_platform_system | ||
| # test_input_output_table = [ | ||
| # [self.mock_linux_distribution_to_return_azure_linux_4, self.mock_distro_os_release_attr_return_azure_linux_4, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Microsoft Azure Linux][Version=4.0][Code=]\n"], | ||
| # [self.mock_linux_distribution_to_return_rhel_10, self.mock_distro_os_release_attr_return_rhel_10, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Red Hat][Version=10.0][Code=abc]\n"], | ||
| # ] | ||
| # | ||
| # for row in test_input_output_table: | ||
| # self.envlayer.platform.linux_distribution = row[0] | ||
| # distro.os_release_attr = row[1] | ||
| # | ||
| # captured_output = io.StringIO() | ||
| # sys.stdout = captured_output | ||
| # result = self.envlayer.get_package_manager() | ||
| # sys.stdout = sys.__stdout__ | ||
| # self.assertEqual(row[2], captured_output.getvalue()) | ||
| # self.assertEqual(result, "") | ||
| # | ||
| # # restore | ||
| # self.__restore_mocks() |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #346 +/- ##
==========================================
- Coverage 93.84% 93.59% -0.25%
==========================================
Files 105 105
Lines 18218 18337 +119
==========================================
+ Hits 17096 17162 +66
- Misses 1122 1175 +53
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “update certificates” flow intended to run during patch installation, adds related constants/error codes, and adjusts tests around package manager detection.
Changes:
- Adds certificate-update plumbing to
PackageManager(mokutil detection, cert status retrieval helpers, andupdate_certs()orchestration). - Invokes
package_manager.update_certs()at the start of patch installation. - Adds
CERTIFICATE_UPDATEerror code + certificate-related constants, and comments out an EnvLayer unsupported-distro test.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/core/tests/Test_EnvLayer.py | Comments out the Azure Linux 4 / RHEL 10 unsupported-distro test. |
| src/core/src/package_managers/ZypperPackageManager.py | Adds abstract stub cert-update methods that override base behavior. |
| src/core/src/package_managers/YumPackageManager.py | Adds abstract stub cert-update methods that override base behavior. |
| src/core/src/package_managers/TdnfPackageManager.py | Adds abstract stub cert-update methods that override base behavior. |
| src/core/src/package_managers/PackageManager.py | Adds shared cert-update orchestration and mokutil/cert status command constants. |
| src/core/src/package_managers/AptitudePackageManager.py | Adds apt/fwupd-based cert update flow, but also overrides key base methods with abstract stubs. |
| src/core/src/core_logic/PatchInstaller.py | Calls package_manager.update_certs() during installation start. |
| src/core/src/bootstrap/Constants.py | Adds Certificates enum + LATEST_CERTIFICATE_TIMESTAMP + CERTIFICATE_UPDATE error code. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # self.backup_platform_system = platform.system | ||
| # self.backup_linux_distribution = self.envlayer.platform.linux_distribution | ||
| # self.backup_distro_os_release_attr = distro.os_release_attr | ||
| # | ||
| # platform.system = self.mock_platform_system | ||
| # test_input_output_table = [ | ||
| # [self.mock_linux_distribution_to_return_azure_linux_4, self.mock_distro_os_release_attr_return_azure_linux_4, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Microsoft Azure Linux][Version=4.0][Code=]\n"], | ||
| # [self.mock_linux_distribution_to_return_rhel_10, self.mock_distro_os_release_attr_return_rhel_10, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Red Hat][Version=10.0][Code=abc]\n"], | ||
| # ] | ||
| # | ||
| # for row in test_input_output_table: | ||
| # self.envlayer.platform.linux_distribution = row[0] | ||
| # distro.os_release_attr = row[1] | ||
| # | ||
| # captured_output = io.StringIO() | ||
| # sys.stdout = captured_output | ||
| # result = self.envlayer.get_package_manager() | ||
| # sys.stdout = sys.__stdout__ | ||
| # self.assertEqual(row[2], captured_output.getvalue()) | ||
| # self.assertEqual(result, "") | ||
| # | ||
| # # restore | ||
| # self.__restore_mocks() | ||
|
|
| reboot_manager.start_reboot_if_required_and_time_available(maintenance_window.get_remaining_time_in_minutes(None, False)) | ||
|
|
||
| # Update certs if available | ||
| self.package_manager.update_certs() |
| self.get_kek_cert_status_cmd = "mokutil --kek | grep 'CN='" # todo: review if this should be changed to mokutil --kek 2>&1 | grep -oP 'CN=\K[^,]+' | sort -u | tr '\n' '; ' || echo "none" | ||
| self.get_db_cert_status_cmd = "mokutil --db | grep 'CN='" |
| @abstractmethod | ||
| def update_certs(self): | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_install_mokutil(self): | ||
| """ Attempts to install mokutil """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): | ||
| # """ Fetches the status of the certificates on the machine """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_latest_cert_installed(self, status_code, status_output): | ||
| """ Checks if the latest certs are already installed on the machine based on mokutil output """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_update_certs(self): | ||
| """ Attempts to update certificate status """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def are_latest_certs_present(self): | ||
| pass |
| # region Update certificates in factory defaults | ||
| @abstractmethod | ||
| def update_certs(self): | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_install_mokutil(self): | ||
| """ Attempts to install mokutil """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): | ||
| # """ Fetches the status of the certificates on the machine """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_latest_cert_installed(self, status_code, status_output): | ||
| """ Checks if the latest certs are already installed on the machine based on mokutil output """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_update_certs(self): | ||
| """ Attempts to update certificate status """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def are_latest_certs_present(self): | ||
| pass | ||
| # endregion |
| @abstractmethod | ||
| def update_certs(self): | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_install_mokutil(self): | ||
| """ Attempts to install mokutil """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): | ||
| # """ Fetches the status of the certificates on the machine """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_latest_cert_installed(self, status_code, status_output): | ||
| """ Checks if the latest certs are already installed on the machine based on mokutil output """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_update_certs(self): | ||
| """ Attempts to update certificate status """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def are_latest_certs_present(self): | ||
| pass |
| # region Update certificates in factory defaults | ||
| @abstractmethod | ||
| def try_install_mokutil(self): | ||
| # type: () -> bool | ||
| """ Attempts to install mokutil """ | ||
| cmd = self.install_mokutil_cmd | ||
| self.composite_logger.log_verbose('[APM] Invoking install mokutil command [Command={0}]'.format(cmd)) | ||
| out, code = self.invoke_package_manager_advanced(cmd, raise_on_exception=False) | ||
| self.composite_logger.log_debug('[APM] Invoked install mokutil exists. [Command={0}][Code={1}][Output={2}]'.format(cmd, str(code), str(out))) | ||
| return code == 0 | ||
|
|
||
| @abstractmethod | ||
| def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): | ||
| # """ Fetches the status of the certificates on the machine """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_latest_cert_installed(self, status_code, status_output): | ||
| """ Checks if the latest certs are already installed on the machine based on mokutil output """ | ||
| pass | ||
|
|
||
| def try_update_certs(self): | ||
| """ Attempts to update certificate status """ | ||
| self.composite_logger.log("[APM][Certs] Starting cert update flow.") | ||
| if self.are_latest_certs_present(): | ||
| self.composite_logger.log("[APM][Certs] Latest certs already present. Skipping.") | ||
| return True | ||
|
|
||
| success = False | ||
| try: | ||
| # shell/file steps | ||
| self.__run_cert_shell_command(self.add_proposed_repo_cmd, "AddProposedRepo", raise_on_error=True) | ||
| self.__run_cert_shell_command(self.create_proposed_pin_cmd, "CreateAptPin", raise_on_error=True) | ||
|
|
||
| # apt/package-manager steps | ||
| self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdateAfterRepo", raise_on_error=True) | ||
| self.__run_cert_apt_command(self.install_fwupd_from_proposed_cmd, "InstallFwupdFromProposed", raise_on_error=True) | ||
|
|
||
| # shell fwupd steps | ||
| self.__run_cert_shell_command(self.fwupd_refresh_cmd, "FwupdRefresh", raise_on_error=True) | ||
| self.__run_cert_shell_command(self.fwupd_update_cmd, "FwupdUpdate", raise_on_error=True) | ||
|
|
||
| if self.are_latest_certs_present(): | ||
| self.composite_logger.log("[APM][Certs] Cert update completed and verified.") | ||
| success = True | ||
| else: | ||
| msg = "[APM][Certs] Cert update commands completed but latest certs not detected." | ||
| self.composite_logger.log_error(msg) | ||
| self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE) | ||
|
|
||
| except Exception as error: | ||
| self.composite_logger.log_error( | ||
| "[APM][Certs] Exception during cert update flow. [Error={0}]".format(repr(error))) | ||
|
|
||
| finally: | ||
| # best-effort cleanup | ||
| self.__run_cert_shell_command(self.remove_proposed_pin_cmd, "CleanupAptPin", raise_on_error=False) | ||
| self.__run_cert_shell_command(self.remove_proposed_repo_cmd, "CleanupProposedRepo", raise_on_error=False) | ||
| self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdateAfterCleanup", raise_on_error=False) | ||
|
|
||
| if success: | ||
| self.status_handler.set_reboot_pending(self.is_reboot_pending()) | ||
|
|
||
| return success | ||
|
|
||
| @abstractmethod | ||
| def are_latest_certs_present(self): | ||
| pass |
| self.proposed_repo_file_path = "/etc/apt/sources.list.d/proposed.list" | ||
| self.proposed_pin_file_path = "/etc/apt/preferences.d/proposed" | ||
| self.add_proposed_repo_cmd = ( | ||
| "bash -c 'echo \"deb http://archive.ubuntu.com/ubuntu/ " |
| import re | ||
| import shutil | ||
| import sys | ||
| from abc import abstractmethod |
| import os | ||
| import re | ||
| import time | ||
| from abc import abstractmethod |
| import json | ||
| import os | ||
| import re | ||
| from abc import abstractmethod |
| pass | ||
|
|
||
| # region Update certificates in factory defaults | ||
| def update_certs(self): |
There was a problem hiding this comment.
Lets add VM security type condition too. This update should not be done in CVMs in current form.
| self.proposed_repo_file_path = "/etc/apt/sources.list.d/proposed.list" | ||
| self.proposed_pin_file_path = "/etc/apt/preferences.d/proposed" | ||
| self.add_proposed_repo_cmd = ( | ||
| "bash -c 'echo \"deb http://archive.ubuntu.com/ubuntu/ " |
There was a problem hiding this comment.
I agree with the copilot review here. Can we use https
| self.composite_logger.log("[APM][Certs] Cert update completed and verified.") | ||
| success = True | ||
| else: | ||
| msg = "[APM][Certs] Cert update commands completed but latest certs not detected." |
There was a problem hiding this comment.
What does this condition actually mean? Like there was no error in the cert install, but the latest certs weren't available? Could we have updated to something other than the latest certs, and in that case, would we need a reboot below? Just trying to understand what could go wrong that would lead us to not end up in the exception case, but still not install the certs
| step_name, str(command), str(code), str(out)) | ||
| self.composite_logger.log_error(msg) | ||
| self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE) | ||
| if raise_on_error: |
There was a problem hiding this comment.
When would we not want to raise_on_error?
|
|
||
| def __run_cert_apt_command(self, command, step_name, raise_on_error=False): | ||
| """Run apt/dpkg commands through package-manager wrapper.""" | ||
| out, code = self.invoke_package_manager_advanced(command, raise_on_exception=False) |
There was a problem hiding this comment.
Same question as below, could this just lead us to failing silently?
| if is_mokutil_installed or try_install_mokutil_status: | ||
| self.try_update_certs() | ||
| else: | ||
| error_msg = "[PM][UpdateCerts] Mokutil is not installed or could not be installed. Cannot fetch current certs." |
There was a problem hiding this comment.
nit: For clarity of logging, I think this should be and. If we're in this case, both mokutil is not installed AND it could not be installed.
|
|
||
| # restore | ||
| self.__restore_mocks() | ||
| # def test_get_package_manager_azure_linux_4_and_rhel10_not_supported(self): |
There was a problem hiding this comment.
Why are we commenting out tests and not just deleting? If we need them in the future, we can just grab them from past commits.
| pass | ||
|
|
||
| # region Update certificates in factory defaults | ||
| def update_certs(self): |
There was a problem hiding this comment.
Are there any tests for the new cert code?
Updating package managers